package com.zr.eim.kafka;

import com.alibaba.fastjson2.JSON;
import com.zr.eim.domain.EimDataCloseContactInfo;
import com.zr.eim.service.IEimDataCloseContactInfoService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class EimAnalysisCloseContactInfoListener {

    @Resource
    private IEimDataCloseContactInfoService closeContactInfoService;

//    @KafkaListener(topics = "CloseContactInfo")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack){
        String value = record.value();

        EimDataCloseContactInfo eimDataCloseContactInfo = JSON.parseObject(value, EimDataCloseContactInfo.class);

        closeContactInfoService.addCloseContactInfo(eimDataCloseContactInfo);

        System.out.println(value);
        System.out.println(record);
        //手动提交offset
        ack.acknowledge();
    }


}
